/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dubbo.registry.integration;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.Registry;
import org.apache.dubbo.registry.RegistryFactory;
import org.apache.dubbo.registry.RegistryService;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.ProtocolServer;
import org.apache.dubbo.rpc.ProxyFactory;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.Configurator;
import org.apache.dubbo.rpc.cluster.governance.GovernanceRuleRepository;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ProviderModel;
import org.apache.dubbo.rpc.protocol.InvokerWrapper;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;

import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.EXTRA_KEYS_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.HIDE_KEY_PREFIX;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.RELEASE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.common.constants.FilterConstants.VALIDATION_KEY;
import static org.apache.dubbo.common.constants.QosConstants.ACCEPT_FOREIGN_IP;
import static org.apache.dubbo.common.constants.QosConstants.QOS_ENABLE;
import static org.apache.dubbo.common.constants.QosConstants.QOS_HOST;
import static org.apache.dubbo.common.constants.QosConstants.QOS_PORT;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.CONFIGURATORS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.OVERRIDE_PROTOCOL;
import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_PROTOCOL;
import static org.apache.dubbo.common.constants.RegistryConstants.ROUTERS_CATEGORY;
import static org.apache.dubbo.common.utils.UrlUtils.classifyUrls;
import static org.apache.dubbo.registry.Constants.CONFIGURATORS_SUFFIX;
import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL;
import static org.apache.dubbo.registry.Constants.DEFAULT_REGISTRY;
import static org.apache.dubbo.registry.Constants.PROVIDER_PROTOCOL;
import static org.apache.dubbo.registry.Constants.REGISTER_IP_KEY;
import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
import static org.apache.dubbo.registry.Constants.SIMPLIFIED_KEY;
import static org.apache.dubbo.remoting.Constants.BIND_IP_KEY;
import static org.apache.dubbo.remoting.Constants.BIND_PORT_KEY;
import static org.apache.dubbo.remoting.Constants.CHECK_KEY;
import static org.apache.dubbo.remoting.Constants.CODEC_KEY;
import static org.apache.dubbo.remoting.Constants.CONNECTIONS_KEY;
import static org.apache.dubbo.remoting.Constants.EXCHANGER_KEY;
import static org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY;
import static org.apache.dubbo.rpc.Constants.DEPRECATED_KEY;
import static org.apache.dubbo.rpc.Constants.INTERFACES;
import static org.apache.dubbo.rpc.Constants.MOCK_KEY;
import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.EXPORT_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.WARMUP_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.WEIGHT_KEY;

/**
 * RegistryProtocol
 */
public class RegistryProtocol implements Protocol {
    public static final String[] DEFAULT_REGISTER_PROVIDER_KEYS = {
            APPLICATION_KEY, CODEC_KEY, EXCHANGER_KEY, SERIALIZATION_KEY, CLUSTER_KEY, CONNECTIONS_KEY, DEPRECATED_KEY,
            GROUP_KEY, LOADBALANCE_KEY, MOCK_KEY, PATH_KEY, TIMEOUT_KEY, TOKEN_KEY, VERSION_KEY, WARMUP_KEY,
            WEIGHT_KEY, TIMESTAMP_KEY, DUBBO_VERSION_KEY, RELEASE_KEY
    };

    public static final String[] DEFAULT_REGISTER_CONSUMER_KEYS = {
            APPLICATION_KEY, VERSION_KEY, GROUP_KEY, DUBBO_VERSION_KEY, RELEASE_KEY
    };

    private final static Logger logger = LoggerFactory.getLogger(RegistryProtocol.class);
    private final Map<URL, NotifyListener> overrideListeners = new ConcurrentHashMap<>();
    private final Map<String, ServiceConfigurationListener> serviceConfigurationListeners = new ConcurrentHashMap<>();
    private final ProviderConfigurationListener providerConfigurationListener = new ProviderConfigurationListener();
    //To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed.
    //providerurl <--> exporter
    private final ConcurrentMap<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<>();
    private Cluster cluster;
    private Protocol protocol;
    private RegistryFactory registryFactory;
    private ProxyFactory proxyFactory;

    //Filter the parameters that do not need to be output in url(Starting with .)
    private static String[] getFilteredKeys(URL url) {
        Map<String, String> params = url.getParameters();
        if (CollectionUtils.isNotEmptyMap(params)) {
            return params.keySet().stream()
                    .filter(k -> k.startsWith(HIDE_KEY_PREFIX))
                    .toArray(String[]::new);
        } else {
            return new String[0];
        }
    }

    public void setCluster(Cluster cluster) {
        this.cluster = cluster;
    }

    public void setProtocol(Protocol protocol) {
        this.protocol = protocol;
    }

    public void setRegistryFactory(RegistryFactory registryFactory) {
        this.registryFactory = registryFactory;
    }

    public void setProxyFactory(ProxyFactory proxyFactory) {
        this.proxyFactory = proxyFactory;
    }

    @Override
    public int getDefaultPort() {
        return 9090;
    }

    public Map<URL, NotifyListener> getOverrideListeners() {
        return overrideListeners;
    }

    public void register(URL registryUrl, URL registeredProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registeredProviderUrl);

        ProviderModel model = ApplicationModel.getProviderModel(registeredProviderUrl.getServiceKey());
        model.addStatedUrl(new ProviderModel.RegisterStatedURL(
                registeredProviderUrl,
                registryUrl,
                true
        ));
    }

    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //这里得到的是注册中心的url,key是registry=registry://localhost:2181/org.apache.dubbo.registry.RegistryService?....
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        //这里得到的是服务提供者的url，key是export=dubbo://192.168.146.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&...
        URL providerUrl = getProviderUrl(originInvoker);

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
        //创建了一个地址，协议是provider=provider://192.168.146.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&...
        //这里是为了兼容老版本的动态配置监听，生成的url协议是provider
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);

        //创建了一个监听器，这个监听器就是监听的远程配置中心的动态配置项，包含应用的配置和服务的配置
        /**
         * 这里创建的overrideSubscribeListener监听器是每一个服务一个监听器，然后放入缓存中，监听器中绑定的对象是originInvoker
         * originInvoker是一个服务调用者对象，服务消费的时候就会获取Invoker进行调用，这个监听器主要做的事情就是监听动态配置的内容
         * 动态配置分为了应用的配置和服务的配置；所以如果是修改了应用的配置：
         * 那么就需要触发应用下面的所有监听器的执行监听器的执行，进行重新注册，也就是重新导出服务
         * 如果修改的是某个服务的动态配置，那么触发的就是这个服务的监听器执行，也就是重新导出服务
         * 应用的配置路径是：/dubbo/config/dubbo/dubbo-demo-annotation-provider.configurators
         * 服务的配置路径是：/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService.configurators(org.apache.dubbo.demo.DemoService服务名称)
         */
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        /**
         * 这里做的事情就是新版的监听器的绑定，动态配置的远程读取，然后绑定监听器开启监听
         * 动态配置读取过后然后覆盖到服务注册地址providerUrl上的参数中
         */
        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker
        //本地服务导出，本地服务导出做的事情就是将invoker放入对应的缓存中，这个缓存是在服务消费的时候获取调用
        //还做的事情就是开启netty或者tomcat
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // url to registry
        //registry获取的注册中心的工厂对象ZookeeperRegistryFactory
        final Registry registry = getRegistry(originInvoker);
        /**
         * 下面这个方法做的事情就是url参数删除，怎么说呢？就是比如说要注册到注册中心，注册的地址为：
         * dubbo://192.168.93.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&bind.ip=192.168.93.1&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&
         * methods=sayHello,sayHelloAsync&pid=2124&release=&side=provider&timestamp=1614137784219
         * 但是这个地址上的ip和端口肯定不能通过动态配置进行修改的，比如这个url上的bind.ip和bind.port
         * 是不能修改的，所以你注册到注册中心上是没有任何意义的，所以类似下面的这个方法就是
         * 将注册地址providerUrl中的一些无用的参数或者说不能修改的一些参数进行剔除，剔除完成过后就是要注册到注册
         * 中心的完整地址
         */
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
        // decide if we need to delay publish
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
            //将服务提供者提供的地址registeredProviderUrl注册到注册中心registryUrl上
            //registeredProviderUrl:是剔除一些无用参数后的最后注册地址：dubbo://192.168.93.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=7716&release=&side=provider&timestamp=1614138010256
            //registryUrl注册中心地址：zookeeper://localhost:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-annotation-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.93.1%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-annotation-provider%26bind.ip%3D192.168.93.1%26bind.port%3D20880%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D7716%26release%3D%26side%3Dprovider%26timestamp%3D1614138010256&pid=7716&timestamp=1614138010244
            register(registryUrl, registeredProviderUrl);
        }

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        //这个是为了支持老版本的动态配置监听，老版本使用的是provider=provider://192.168.146.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&...
        //老版本监听的路径是：/dubbo/config/configurators
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        //registeredProviderUrl是真实的注册地址
        exporter.setRegisterUrl(registeredProviderUrl);
        //overrideSubscribeUrl是为了兼容老版本的远程配置地址
        exporter.setSubscribeUrl(overrideSubscribeUrl);

        notifyExport(exporter);
        //Ensure that a new exporter instance is returned every time export
        //最后将invoker封装成一个DestroyableExporter返回
        return new DestroyableExporter<>(exporter);
    }

    private <T> void notifyExport(ExporterChangeableWrapper<T> exporter) {
        List<RegistryProtocolListener> listeners = ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class)
                .getActivateExtension(exporter.getOriginInvoker().getUrl(), "registry.protocol.listener");
        if (CollectionUtils.isNotEmpty(listeners)) {
            for (RegistryProtocolListener listener : listeners) {
                listener.onExport(this, exporter);
            }
        }
    }

    private URL overrideUrlWithConfig(URL providerUrl, OverrideListener listener) {
        //这个监听器是在这个RegistryProtocol初始化的时候创建的，也就是只有一个，每个应用只有一个单例，这个监听器做的事情就是
        //将本应用中的动态配置读取出来，然后监听配置的变化，监听的具体操作是在providerConfigurationListener的初始化进行操作的
        //比如说应用中配置了一个动态配置，路径是/dubbo/config/dubbo-demo-annotation-provider.configurators,那么在启动的时候
        //providerConfigurationListener的初始化的时候要去读取这个应用的配置内容
        /**
         * 在providerConfigurationListener的构造方法中读取了应用的配置内容信息放入了list  configurators中
         * 所以在启动的时候如果配置了应用的动态配置，providerConfigurationListener会将动态配置内容读取了放入列表configurators中
         * 然后providerUrl是本方法传过来的服务提供者的地址，也就是dubbo://xxxxx****,所以这里调用了overrideUrl方法其实做的事情就
         * 是将应用的动态配置内容中涉及到的一些新的值覆盖到服务提供者的地址，这样就完成了动态配置的更新到服务注册地址上面
         */
        providerUrl = providerConfigurationListener.overrideUrl(providerUrl);
        //这里做的应用的配置更新，一样的调用zk去读取动态配置，动态配置读取过后放入监听器的configurators中
        //这里做的事情和providerConfigurationListener中的initWith做的事情一样，读取最小的动态配置内容，然后绑定监听器
        ServiceConfigurationListener serviceConfigurationListener = new ServiceConfigurationListener(providerUrl, listener);
        serviceConfigurationListeners.put(providerUrl.getServiceKey(), serviceConfigurationListener);
        //从监听器的configurators中更新配置信息到服务提供者providerUrl中，所以到这里就知道了对于动态配置来说，优先级是服务的动态配置优先级大于
        //应用的，如果应用和服务都配置了timeout，那么肯定是以服务的为准
        //不管是应用的动态配置还是服务的动态配置，当修改了配置过后，触发的监听器方法是notifyOverrides
        return serviceConfigurationListener.overrideUrl(providerUrl);
    }

    @SuppressWarnings("unchecked")
    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);

        /**
         * 这里又是利用dubbo的SPI机制来实现的，SPI规定了url里面的参数protocol的名字决定了调用那个扩展，
         * 当然了我这里使用的是dubbo协议，所以下面的export调用的就是DubboProtocol，进入dubbo服务的本地导出
         * 本地导出也就是将invoker放入一个map中，然后消费者调用的时候就会通过这个map去获取对应的invoker，进行调用
         * 这里还做了一个事情就是启动netty或者tomcat，下面的调用路径是org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export(org.apache.dubbo.rpc.Invoker)
         * 最后放入缓存bounds中，key=根据invoker生成的
         */
        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }

    public <T> void reExport(Exporter<T> exporter, URL newInvokerUrl) {
        if (exporter instanceof ExporterChangeableWrapper) {
            ExporterChangeableWrapper<T> exporterWrapper = (ExporterChangeableWrapper<T>) exporter;
            Invoker<T> originInvoker = exporterWrapper.getOriginInvoker();
            reExport(originInvoker, newInvokerUrl);
        }
    }

    /**
     * Reexport the invoker of the modified url
     *
     * @param originInvoker
     * @param newInvokerUrl
     * @param <T>
     */
    @SuppressWarnings("unchecked")
    public <T> void reExport(final Invoker<T> originInvoker, URL newInvokerUrl) {
        //根据原始的invoker获取缓存key,和之前服务导出到本地的一样
        String key = getCacheKey(originInvoker);
        //获取本地的invoker，从缓存中获取，获取过后进行重新导出，重新导出完成过后也就修改到了本地的缓存
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        //获取注册中心的地址
        URL registeredUrl = exporter.getRegisterUrl();

        URL registryUrl = getRegistryUrl(originInvoker);
        //服务提供者url剔除无用的参数
        URL newProviderUrl = getUrlToRegistry(newInvokerUrl, registryUrl);

        // update local exporter
        //
        Invoker<T> invokerDelegate = new InvokerDelegate<T>(originInvoker, newInvokerUrl);
        //进行重新导出
        exporter.setExporter(protocol.export(invokerDelegate));

        // update registry
        if (!newProviderUrl.equals(registeredUrl)) {
            if (getProviderUrl(originInvoker).getParameter(REGISTER_KEY, true)) {
                Registry registry = getRegistry(originInvoker);
                logger.info("Try to unregister old url: " + registeredUrl);
                //重新导出需要重新注册到zookeeper中，所以这里先把之前的删除
                registry.unregister(registeredUrl);

                logger.info("Try to register new url: " + newProviderUrl);
                //这里执行重新导出，导出过程和正向的导出流程 一样
                registry.register(newProviderUrl);
            }

            ProviderModel.RegisterStatedURL statedUrl = getStatedUrl(registryUrl, newProviderUrl);
            statedUrl.setProviderUrl(newProviderUrl);
            exporter.setRegisterUrl(newProviderUrl);
        }
    }

    private ProviderModel.RegisterStatedURL getStatedUrl(URL registryUrl, URL providerUrl) {
        ProviderModel providerModel = ApplicationModel.getServiceRepository()
                .lookupExportedService(providerUrl.getServiceKey());

        List<ProviderModel.RegisterStatedURL> statedUrls = providerModel.getStatedUrl();
        return statedUrls.stream()
                .filter(u -> u.getRegistryUrl().equals(registryUrl)
                        && u.getProviderUrl().getProtocol().equals(providerUrl.getProtocol()))
                .findFirst().orElseThrow(() -> new IllegalStateException("There should have at least one registered url."));
    }

    /**
     * Get an instance of registry based on the address of invoker
     *
     * @param originInvoker
     * @return
     */
    protected Registry getRegistry(final Invoker<?> originInvoker) {
        URL registryUrl = getRegistryUrl(originInvoker);
        return registryFactory.getRegistry(registryUrl);
    }

    protected URL getRegistryUrl(Invoker<?> originInvoker) {
        URL registryUrl = originInvoker.getUrl();
        if (REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
            String protocol = registryUrl.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY);
            registryUrl = registryUrl.setProtocol(protocol).removeParameter(REGISTRY_KEY);
        }
        return registryUrl;
    }

    protected URL getRegistryUrl(URL url) {
        return URLBuilder.from(url)
                .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
                .removeParameter(REGISTRY_KEY)
                .build();
    }


    /**
     * Return the url that is registered to the registry and filter the url parameter once
     *剔除url中无用的参数
     * @param providerUrl
     * @return url to registry.
     */
    private URL getUrlToRegistry(final URL providerUrl, final URL registryUrl) {
        //The address you see at the registry
        if (!registryUrl.getParameter(SIMPLIFIED_KEY, false)) {
            //剔除列表，在列表中都需要进行剔除，剔除其实很简单，就是调用hashmap的remove进行剔除
            return providerUrl.removeParameters(getFilteredKeys(providerUrl)).removeParameters(
                    MONITOR_KEY, BIND_IP_KEY, BIND_PORT_KEY, QOS_ENABLE, QOS_HOST, QOS_PORT, ACCEPT_FOREIGN_IP, VALIDATION_KEY,
                    INTERFACES);
        } else {
            String extraKeys = registryUrl.getParameter(EXTRA_KEYS_KEY, "");
            // if path is not the same as interface name then we should keep INTERFACE_KEY,
            // otherwise, the registry structure of zookeeper would be '/dubbo/path/providers',
            // but what we expect is '/dubbo/interface/providers'
            if (!providerUrl.getPath().equals(providerUrl.getParameter(INTERFACE_KEY))) {
                if (StringUtils.isNotEmpty(extraKeys)) {
                    extraKeys += ",";
                }
                extraKeys += INTERFACE_KEY;
            }
            String[] paramsToRegistry = getParamsToRegistry(DEFAULT_REGISTER_PROVIDER_KEYS
                    , COMMA_SPLIT_PATTERN.split(extraKeys));
            return URL.valueOf(providerUrl, paramsToRegistry, providerUrl.getParameter(METHODS_KEY, (String[]) null));
        }

    }

    private URL getSubscribedOverrideUrl(URL registeredProviderUrl) {
        return registeredProviderUrl.setProtocol(PROVIDER_PROTOCOL)
                .addParameters(CATEGORY_KEY, CONFIGURATORS_CATEGORY, CHECK_KEY, String.valueOf(false));
    }

    /**
     * Get the address of the providerUrl through the url of the invoker
     *
     * @param originInvoker
     * @return
     */
    private URL getProviderUrl(final Invoker<?> originInvoker) {
        String export = originInvoker.getUrl().getParameterAndDecoded(EXPORT_KEY);
        if (export == null || export.length() == 0) {
            throw new IllegalArgumentException("The registry export url is null! registry: " + originInvoker.getUrl());
        }
        return URL.valueOf(export);
    }

    /**
     * Get the key cached in bounds by invoker
     *
     * @param originInvoker
     * @return
     */
    private String getCacheKey(final Invoker<?> originInvoker) {
        URL providerUrl = getProviderUrl(originInvoker);
        String key = providerUrl.removeParameters("dynamic", "enabled").toFullString();
        return key;
    }

    @Override
    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = getRegistryUrl(url);
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        return doRefer(cluster, registry, type, url);
    }

    private Cluster getMergeableCluster() {
        return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable");
    }

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (directory.isShouldRegister()) {
            directory.setRegisteredConsumerUrl(subscribeUrl);
            //服务消费url注册到zeekeeper
            registry.register(directory.getRegisteredConsumerUrl());
        }
        //构造路由链，在dubbo-admin中是可以进行路由配置的，路由配置就是根据应用或者服务可以动态配置一些路由规则，交易请求过来的时候根据路由规则
        //进行处理，所以这里需要构造一个路由链，然后将构造出来的路由链赋值给directory中一个属性routerChain，而真正的服务调用的时候就可以
        //将服务提供者列表和路由链进行过滤，根据路由规则进行过滤得到一个新的路由列表，在进行负载均衡，最终选择一个服务提供者进行调用
        //这里构建路由链主要是以应用级别和服务级别添加了两个监听器和获取这两个条件路由的信息喷子到路由链中，而条件路由并没有在这里添加监听和获取
        /**
         * buildRouterChain这里发布和绑定了两个监听器，都是针对条件路由绑定的两个监听器，一个是应用条件路由监听器，一个是服务条件路由监听器
         * 但是标签路由在这个方法里面没有去绑定，为什么呢？dubbo目前的版本来说标签路由只能用于服务提供者，服务消费者如果要启用标签路由调用的话
         * 需要在请求里面设置标签，然后服务消费调用的时候根据标签去过滤，所以标签路由就需要知道服务提供者的名字，而代码执行到这里，服务提供者列表
         * 都还没有去注册中心获取到，所以服务提供者的名字也就还没有办法获取到，所以这里就不能绑定和获取标签路由的信息，所以下面的这个方法做的事情就是
         * 处理应用条件路由和服务条件路由，构建一个服务路由链，监听和获取的zookeeper地址是：
         * 新版本应用条件路由：/dubbo/config/dubbo/dubbo-provider.condition-router
         * 新版本服务条件路由：org.apache.dubbo.demo.DemoService.condition-router
         */
        directory.buildRouterChain(subscribeUrl);
        /**
         * 这里是发布和订阅，这里兼容了老版本的配置中心监听和获取；subscribe方法处理了很多过程，你想下，这个方法dubbo传入了服务消费的地址
         * 并且这个服务消费的地址是已经注册到zookeeper中的/dubbo/org.apache.dubbo.demo.DemoService/consumers,这个服务消费的地址是通过
         * @Reference来构建的，dubbo通过服务消费的url要去注册中心的proivders下获取对应的服务提供者进行引入服务，除了要引入服务以外，还有就是
         * 服务的远程配置，比如说服务的动态配置、应用的动态配置都需要获取和监听，并且服务提供者列表页需要进行监听，当有新的服务加入能够第一时间监听到
         * 而这里为了兼容老版本，所以简单来说就是subscribe方法兼容了老版本，处理的zookeeper路径有：
         * 老版本的动态配置：/dubbo/org.apache.dubbo.demo.DemoService/configurators
         * 老版本的路由配置：/dubbo/org.apache.dubbo.demo.DemoService/routers
         * 服务提供者列表：/dubbo/org.apache.dubbo.demo.DemoService/providers
         * 新版本的动态服务配置：/dubbo/config/dubbo/com.bml.architect.DemoService:default.configurators
         * 新版本的动态应用配置：/dubbo/config/dubbo/dubbo-provider.configurators
         * 除了上面需要获取和监听的以外，还有一个就是上面已经说了在构建路由器链路的时候只构建了两个条件路由（应用和服务），但是标签路由没有构建
         * 因为在调用buildRouterChain还没有获取到服务提供者，所以无法知道服务提供者的名字，也就没有办法获取和监控标签路由，但是下面的
         * subscribe方法会做的一个事情就是去获取和监听服务提供者也就是路径：/dubbo/org.apache.dubbo.demo.DemoService/providers
         * 所以获取到了服务提供者就可以进行标签路由的监听绑定和标签路由信息的获取，标签路由只能用于服务提供者，并且只支持应用级别的
         * 新版本的标签路由：/dubbo/config/dubbo/dubbo-provider.tag-router
         * 老版本的标签路由：/dubbo/org.apache.dubbo.demo.DemoService/routers/dubbo-provider.tag-router
         */
        //toSubscribeUrl是往url中添加了一个属性category，值是providers，routers，configurators,就是老版本的路由和动态配置的名称
        //下面这个方法调用完成过后得到的是invoker列表放入了RegistryDirectory中的invokers中，并且RegistryDirectory中的路由器链中也有
        //一个invokers设置了服务提供者列表,invoker是AsyncToSyncInvoker，在AsyncToSyncInvoker中包装了DubboInvoker
        directory.subscribe(toSubscribeUrl(subscribeUrl));

        //invoker是FailoverCluster，而得到的invoker对象是FailoverClusterInvoker，FailoverClusterInvoker中
        //包装了RegistryDirectory，RegistryDirectory中包装了invokers候选列表，所以服务调用的时候调用的是FailoverClusterInvoker
        //的invoker方法，从而也可以得到路由列表、服务提供者列表invokers，还可以获取很多信息从而丰富调用过程
        Invoker<T> invoker = cluster.join(directory);
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {
            return invoker;
        }

        RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
        for (RegistryProtocolListener listener : listeners) {
            listener.onRefer(this, registryInvokerWrapper);
        }
        return registryInvokerWrapper;
    }

    public <T> void reRefer(Invoker<T> invoker, URL newSubscribeUrl) {
        if (!(invoker instanceof RegistryInvokerWrapper)) {
            return;
        }

        RegistryInvokerWrapper<T> invokerWrapper = (RegistryInvokerWrapper<T>) invoker;
        URL oldSubscribeUrl = invokerWrapper.getUrl();
        RegistryDirectory<T> directory = invokerWrapper.getDirectory();
        Registry registry = directory.getRegistry();
        registry.unregister(directory.getRegisteredConsumerUrl());
        directory.unSubscribe(toSubscribeUrl(oldSubscribeUrl));

        directory.setRegisteredConsumerUrl(newSubscribeUrl);
        registry.register(directory.getRegisteredConsumerUrl());
        directory.buildRouterChain(newSubscribeUrl);
        directory.subscribe(toSubscribeUrl(newSubscribeUrl));

        invokerWrapper.setInvoker(invokerWrapper.getCluster().join(directory));
        invokerWrapper.setUrl(newSubscribeUrl);
    }

    private static URL toSubscribeUrl(URL url) {
        return url.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY);
    }

    private List<RegistryProtocolListener> findRegistryProtocolListeners(URL url) {
        return ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class)
                .getActivateExtension(url, "registry.protocol.listener");
    }

    // available to test
    public String[] getParamsToRegistry(String[] defaultKeys, String[] additionalParameterKeys) {
        int additionalLen = additionalParameterKeys.length;
        String[] registryParams = new String[defaultKeys.length + additionalLen];
        System.arraycopy(defaultKeys, 0, registryParams, 0, defaultKeys.length);
        System.arraycopy(additionalParameterKeys, 0, registryParams, defaultKeys.length, additionalLen);
        return registryParams;
    }

    @Override
    public void destroy() {
        List<RegistryProtocolListener> listeners = ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class)
                .getLoadedExtensionInstances();
        if (CollectionUtils.isNotEmpty(listeners)) {
            for (RegistryProtocolListener listener : listeners) {
                listener.onDestroy();
            }
        }

        List<Exporter<?>> exporters = new ArrayList<Exporter<?>>(bounds.values());
        for (Exporter<?> exporter : exporters) {
            exporter.unexport();
        }
        bounds.clear();

        ExtensionLoader.getExtensionLoader(GovernanceRuleRepository.class).getDefaultExtension()
                .removeListener(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX, providerConfigurationListener);
    }

    @Override
    public List<ProtocolServer> getServers() {
        return protocol.getServers();
    }

    //Merge the urls of configurators
    private static URL getConfigedInvokerUrl(List<Configurator> configurators, URL url) {
        if (configurators != null && configurators.size() > 0) {
            for (Configurator configurator : configurators) {
                url = configurator.configure(url);
            }
        }
        return url;
    }

    public static class InvokerDelegate<T> extends InvokerWrapper<T> {
        private final Invoker<T> invoker;

        /**
         * @param invoker
         * @param url     invoker.getUrl return this value
         */
        public InvokerDelegate(Invoker<T> invoker, URL url) {
            super(invoker, url);
            this.invoker = invoker;
        }

        public Invoker<T> getInvoker() {
            if (invoker instanceof InvokerDelegate) {
                return ((InvokerDelegate<T>) invoker).getInvoker();
            } else {
                return invoker;
            }
        }
    }

    private static class DestroyableExporter<T> implements Exporter<T> {

        private Exporter<T> exporter;

        public DestroyableExporter(Exporter<T> exporter) {
            this.exporter = exporter;
        }

        @Override
        public Invoker<T> getInvoker() {
            return exporter.getInvoker();
        }

        @Override
        public void unexport() {
            exporter.unexport();
        }
    }

    /**
     * Reexport: the exporter destroy problem in protocol
     * 1.Ensure that the exporter returned by registryprotocol can be normal destroyed
     * 2.No need to re-register to the registry after notify
     * 3.The invoker passed by the export method , would better to be the invoker of exporter
     */
    private class OverrideListener implements NotifyListener {
        private final URL subscribeUrl;
        private final Invoker originInvoker;


        private List<Configurator> configurators;

        public OverrideListener(URL subscribeUrl, Invoker originalInvoker) {
            this.subscribeUrl = subscribeUrl;
            this.originInvoker = originalInvoker;
        }

        /**
         * @param urls The list of registered information, is always not empty, The meaning is the same as the
         *             return value of {@link org.apache.dubbo.registry.RegistryService#lookup(URL)}.
         */
        @Override
        public synchronized void notify(List<URL> urls) {
            logger.debug("original override urls: " + urls);

            List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl.addParameter(CATEGORY_KEY,
                    CONFIGURATORS_CATEGORY));
            logger.debug("subscribe url: " + subscribeUrl + ", override urls: " + matchedUrls);

            // No matching results
            if (matchedUrls.isEmpty()) {
                return;
            }

            this.configurators = Configurator.toConfigurators(classifyUrls(matchedUrls, UrlUtils::isConfigurator))
                    .orElse(configurators);

            doOverrideIfNecessary();
        }

        /**
         * 这个是监听器的触发后需要调用的方法，触发过后会调用到指定的
         * invoker，这里的处理思路其实就是获取到invoker中当前的url
         * 然后获取远程服务器的url，如果内容发生变化，那么url的值也就发生了变化
         * 那么就需要进行重新导出服务，重新导出服务的流程就是重新再走一遍服务导出流程
         * 先要将之前的服务删除，然后进行重新导出
         */
        public synchronized void doOverrideIfNecessary() {
            final Invoker<?> invoker;
            if (originInvoker instanceof InvokerDelegate) {
                invoker = ((InvokerDelegate<?>) originInvoker).getInvoker();
            } else {
                invoker = originInvoker;
            }
            //The origin invoker

            //得到服务的原始url
            URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
            String key = getCacheKey(originInvoker);
            ExporterChangeableWrapper<?> exporter = bounds.get(key);
            if (exporter == null) {
                logger.warn(new IllegalStateException("error state, exporter should not be null"));
                return;
            }
            //The current, may have been merged many times
            //得到服务的当前使用的url
            URL currentUrl = exporter.getInvoker().getUrl();
            //Merged with this configuration
            //这里是远程动态配置的最新url，这个url下面做了合并，什么意思呢，就是说
            /**
             * 第一行获取的newUrl是为了兼容老版本的url，注册中心的地址是：/dubbo/config/configurators
             * 第二行行合并的是新版本的应用url，注册中心地址是：/dubbo/config/dubbo/dubbo-demo-annotation-provider.configurators
             * 第三行合并的是新版本的服务url，注册中心地址是：/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService.configurators
             * 所以从这里可以知道优先级是老版本的最低，新版本的服务配置最高
             */
            URL newUrl = getConfigedInvokerUrl(configurators, currentUrl);
            newUrl = getConfigedInvokerUrl(providerConfigurationListener.getConfigurators(), newUrl);
            newUrl = getConfigedInvokerUrl(serviceConfigurationListeners.get(originUrl.getServiceKey())
                    .getConfigurators(), newUrl);
            //然后判断当前使用的url和远程获取的是否发生变化，如果发现变化就需要进行重新导出，调用reExport
            if (!currentUrl.equals(newUrl)) {
                //重新导出的时候传入的url就是新获取的url
                RegistryProtocol.this.reExport(originInvoker, newUrl);
                logger.info("exported provider url changed, origin url: " + originUrl +
                        ", old export url: " + currentUrl + ", new export url: " + newUrl);
            }
        }

        private List<URL> getMatchedUrls(List<URL> configuratorUrls, URL currentSubscribe) {
            List<URL> result = new ArrayList<URL>();
            for (URL url : configuratorUrls) {
                URL overrideUrl = url;
                // Compatible with the old version
                if (url.getParameter(CATEGORY_KEY) == null && OVERRIDE_PROTOCOL.equals(url.getProtocol())) {
                    overrideUrl = url.addParameter(CATEGORY_KEY, CONFIGURATORS_CATEGORY);
                }

                // Check whether url is to be applied to the current service
                if (UrlUtils.isMatch(currentSubscribe, overrideUrl)) {
                    result.add(url);
                }
            }
            return result;
        }
    }

    private class ServiceConfigurationListener extends AbstractConfiguratorListener {
        private URL providerUrl;
        private OverrideListener notifyListener;

        public ServiceConfigurationListener(URL providerUrl, OverrideListener notifyListener) {
            this.providerUrl = providerUrl;
            this.notifyListener = notifyListener;
            this.initWith(DynamicConfiguration.getRuleKey(providerUrl) + CONFIGURATORS_SUFFIX);
        }

        private <T> URL overrideUrl(URL providerUrl) {
            return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl);
        }

        @Override
        protected void notifyOverrides() {
            notifyListener.doOverrideIfNecessary();
        }
    }

    private class ProviderConfigurationListener extends AbstractConfiguratorListener {

        public ProviderConfigurationListener() {
            //拼接的配置是dubbo-demo-annotation-provider.configurators，然后去注册中心（zk）读取这个动态配置的内容
            this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX);
        }

        /**
         * Get existing configuration rule and override provider url before exporting.
         *
         * @param providerUrl
         * @param <T>
         * @return
         */
        private <T> URL overrideUrl(URL providerUrl) {
            return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl);
        }

        @Override
        protected void notifyOverrides() {
            overrideListeners.values().forEach(listener -> ((OverrideListener) listener).doOverrideIfNecessary());
        }
    }

    /**
     * exporter proxy, establish the corresponding relationship between the returned exporter and the exporter
     * exported by the protocol, and can modify the relationship at the time of override.
     *
     * @param <T>
     */
    private class ExporterChangeableWrapper<T> implements Exporter<T> {

        private final ExecutorService executor = newSingleThreadExecutor(new NamedThreadFactory("Exporter-Unexport", true));

        private final Invoker<T> originInvoker;
        private Exporter<T> exporter;
        private URL subscribeUrl;
        private URL registerUrl;

        public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) {
            this.exporter = exporter;
            this.originInvoker = originInvoker;
        }

        public Invoker<T> getOriginInvoker() {
            return originInvoker;
        }

        @Override
        public Invoker<T> getInvoker() {
            return exporter.getInvoker();
        }

        public void setExporter(Exporter<T> exporter) {
            this.exporter = exporter;
        }

        @Override
        public void unexport() {
            String key = getCacheKey(this.originInvoker);
            bounds.remove(key);

            Registry registry = RegistryProtocol.this.getRegistry(originInvoker);
            try {
                registry.unregister(registerUrl);
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
            try {
                NotifyListener listener = RegistryProtocol.this.overrideListeners.remove(subscribeUrl);
                registry.unsubscribe(subscribeUrl, listener);
                ExtensionLoader.getExtensionLoader(GovernanceRuleRepository.class).getDefaultExtension()
                        .removeListener(subscribeUrl.getServiceKey() + CONFIGURATORS_SUFFIX,
                                serviceConfigurationListeners.get(subscribeUrl.getServiceKey()));
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }

            executor.submit(() -> {
                try {
                    int timeout = ConfigurationUtils.getServerShutdownTimeout();
                    if (timeout > 0) {
                        logger.info("Waiting " + timeout + "ms for registry to notify all consumers before unexport. " +
                                "Usually, this is called when you use dubbo API");
                        Thread.sleep(timeout);
                    }
                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            });
        }

        public void setSubscribeUrl(URL subscribeUrl) {
            this.subscribeUrl = subscribeUrl;
        }

        public void setRegisterUrl(URL registerUrl) {
            this.registerUrl = registerUrl;
        }

        public URL getRegisterUrl() {
            return registerUrl;
        }
    }

    // for unit test
    private static RegistryProtocol INSTANCE;

    // for unit test
    public RegistryProtocol() {
        INSTANCE = this;
    }

    // for unit test
    public static RegistryProtocol getRegistryProtocol() {
        if (INSTANCE == null) {
            ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(REGISTRY_PROTOCOL); // load
        }
        return INSTANCE;
    }
}
